Skip to main content

第 7 课:序列模型

之前的几课中,我们学习了神经网络的实现方式,但是如何实现序列模型结构呢?

序列模型是处理时间序列、文本、语音等顺序数据的核心工具,常见模型包括:

模型简介适用场景
RNN (Recurrent Neural Network)最基本的循环网络,能记住前面的状态简单的时间序列、短文本
LSTM (Long Short-Term Memory)改进版 RNN,引入“记忆门控”机制,能处理长期依赖NLP、语音识别
GRU (Gated Recurrent Unit)类似 LSTM,但结构更简单资源有限的设备
Transformer并行处理序列,不依赖循环结构,靠“注意力机制”捕捉长依赖BERT, GPT 等 NLP 模型基础
Temporal Convolutional Network (TCN)用卷积代替循环网络,提升速度和稳定性时间序列预测
Seq2Seq(编码器-解码器结构)输入输出均为序列,如机器翻译翻译、摘要、对话系统

我们会从 RNN → LSTM/GRU → Transformer 的路径逐步展开。

注:NLP 是自然语言处理


(一)RNN

目标功能:用字符序列预测下一个字符,比如 “hell” → “o”,即训练模型生成句子或单词。

推荐数据集:tiny Shakespeare(常用RNN测试数据集,只有几百 KB,来自莎士比亚作品片段 karpathy/char-rnn

"hello" 为例,训练模型让它学会从 "h""e""l""l""o"

序列模型仍然需要 DataSet 和 DataLoader

项目结构

char_rnn_project/

├── char_rnn.ipynb # 主 Jupyter Notebook
└── data/
└── tinyshakespeare.txt # 数据文件(我们会下载)

步骤一: 下载数据

我们用 Python 自动下载 karpathy/char-rnn 的文本:

import os
import requests

os.makedirs("data", exist_ok=True)
url = "https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt"
save_path = "data/tinyshakespeare.txt"

if not os.path.exists(save_path):
    response = requests.get(url)
    with open(save_path, "w", encoding="utf-8") as f:
        f.write(response.text)

print("数据已保存到 data/tinyshakespeare.txt")

步骤二: 数据预处理

我们把所有字符转换为整数索引,生成序列对(如 "hell" → "ello"):

读取数据

with open("data/tinyshakespeare.txt", "r", encoding="utf-8") as f:
    text = f.read()

建立词汇表

# 建立字符到整数的映射
print(set(text))  
chars = sorted(list(set(text)))  # set(text) 提取 text 中所有不同的字符,用集合去重。
print(chars)
vocab_size = len(chars)
print(f"共 {vocab_size} 个唯一字符")
{'!', 'U', 'c', '?', ':', 'i', 'W', 'x', 'R', 'g', 'M', 'H', 'B', 'N', 'm', 'A', 'S', 'l', 'F', ' ', 'o', '\n', 'k', 'P', '3', 'C', 'Y', ',', 'D', 'y', 'r', 'e', ';', 's', 'J', '&', 'b', 'a', '-', 'n', 'u', 'L', 'O', 'E', 'G', 'X', 'p', 't', '.', 'Z', 'K', 'f', 'I', 'z', 'j', '$', 'w', 'v', "'", 'V', 'Q', 'q', 'h', 'd', 'T'}
['\n', ' ', '!', '$', '&', "'", ',', '-', '.', '3', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
共 65 个唯一字符
# 构建一个字典,把每个字符映射到一个整数编号。
stoi = {ch: i for i, ch in enumerate(chars)} # stoi 是 "string to index" 的缩写。
itos = {i: ch for ch, i in stoi.items()}     # itos 是 "index to string" 的缩写。

设计把 string 转换为整数列表的函数 encode,以及把整数列表转换回 string 的函数 decode。

# 编码函数
def encode(s): 
    return [stoi[c] for c in s]
def decode(l): 
    return ''.join([itos[i] for i in l])

步骤三: 加载数据

import torch
from torch.utils.data import Dataset, DataLoader

class CharDataset(Dataset):
    def __init__(self, text, block_size):
        self.data = encode(text) # 直接把文件中的原始文本当成一个长向量
        self.block_size = block_size

    def __len__(self):
        return len(self.data) - self.block_size

    def __getitem__(self, idx): # 定义如何获取一个样本
        chunk = self.data[idx:idx + self.block_size + 1]
        x = torch.tensor(chunk[:-1], dtype=torch.long)
        y = torch.tensor(chunk[1:], dtype=torch.long)
        return x, y
        # 例如 :
        # data: [h e l l o] = [0 1 2 2 3]
        # block_size=4
        # chunk= data[0:5] = [0 1 2 2 3]
        # x = chunk[:-1] = [0 1 2 2]
        # y = chunk[1:] = [1 2 2 3] 

block_size = 64
dataset = CharDataset(text, block_size)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

步骤四: 定义 Char-RNN 模型

import torch.nn as nn  # nn 包含了所有常用层(如 Linear, RNN, Embedding 等)。

class CharRNN(nn.Module):
    def __init__(self, vocab_size, embed_size=128, hidden_size=256):
        # vocab_size:字符表大小(比如 65),表示有多少种不同的字符;
        # embed_size:嵌入维度,每个字符会变成一个 embed_size 维的向量;
        # hidden_size:RNN 中隐藏状态的维度(记忆容量);
        # 默认嵌入维度为 128,隐藏状态为 256。
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)        # 定义嵌入层
        # 输入: [batch, seq_len, vocab_size]
        # 输出: [batch, seq_len, embed_size]
        # 其中 vocab_size 为: [0, 4, 9] (代表字符 a, e, i),batch = 3
        # 其中 embed_size 为: [[...], [...], [...]]  # 每个是 128维向量
        self.rnn = nn.RNN(embed_size, hidden_size, batch_first=True) # 定义 RNN 层
        # batch_first=True 表示输入张量维度是 [batch, seq_len, features]
        # 输入:[batch, seq_len, embed_size]
        # 输出:[batch, seq_len, hidden_size]
        # 其中 embed_size 为: [[...], [...], [...]]
        # 其中 hidden_size为: [[...], [...], [...]]  # 每个是 256维向量
        
        self.fc = nn.Linear(hidden_size, vocab_size)                 # 定义全连接层
        # 把 RNN 的输出(维度是 hidden_size)投影回字符索引空间(维度是 vocab_size);
        # RNN输出: [batch, seq_len, hidden_size]
        # FC输出:  [batch, seq_len, vocab_size]

    def forward(self, x, h=None): # “正向传播”函数,每次调用 model(x) 时会触发
        x = self.embedding(x)     # x 是一个形如 [batch, seq_len] 的张量(字符索引);
        out, h = self.rnn(x, h)
        out = self.fc(out)
        return out, h
        # out:所有时间步的输出,形状是 [batch, seq_len, hidden_size]
        # h:最后一个时间步的隐藏状态,形状是 [1, batch, hidden_size]

步骤五: 训练模型

# 自动选择设备:GPU 优先
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# 模型 & 损失函数 & 优化器
model = CharRNN(vocab_size).to(device)      # 共 65 个唯一字符
optimizer = torch.optim.Adam(model.parameters(), lr=0.003)
loss_fn = nn.CrossEntropyLoss()             # 多分类损失

model.train()  # 设为训练模式(不是必须,但推荐)

# 训练循环
for epoch in range(10):
    for xb, yb in dataloader:
        xb = xb.to(device)  # 输入转到 GPU
        yb = yb.to(device)  # 标签也要转到 GPU

        out, _ = model(xb)  # 前向传播
        # out.shape == [batch, seq_len, vocab_size] (每个时间步对 vocab 的 logits)
        # _ 是最后时刻的隐藏态,这里不需要就丢弃。
        loss = loss_fn(out.view(-1, vocab_size), yb.view(-1))

        # 反向传播
        optimizer.zero_grad()  # 清空梯度
        loss.backward()        # 反向传播
        optimizer.step()       # 参数更新

    print(f"Epoch {epoch} | Loss: {loss.item():.4f}")

现在读取的是整个 tinyshakespeare.txt 文件的内容(大约 1MB 文本),然后把整个文本编码为一长串字符索引(整数),这意味着你拥有的是一个长度为 ~1,000,000 的 索引列表,形如:

[47, 30, 12, 12, 53,  ...]  # 每个数字代表一个字符

你不会直接把整个序列塞给模型,而是将其按小块切分为很多小样本(block)

例如,text = "hello world",block_size=4,则

chunk = text[i:i+block_size+1]       # 比如 "hello"
x = chunk[:-1] = "hell"   # 模型输入
y = chunk[1:]  = "ello"   # 目标输出(每个字符预测它右边的字符)

每个 block 分成输入 x 和目标 y(x = block[:-1], y = block[1:]),用 DataLoader 按 batch 送入模型训练

而 一个 batch 是由 32 个 block 组成的“迷你训练集”;


如果最后几个字符不够一个 block 怎么办?
def __len__(self):
    return len(self.data) - self.block_size

这意味着你最多只能访问 self.__getitem__(0)self.__getitem__(len(self.data) - self.block_size)

确保每个 __getitem__(idx) 调用时,切片 idx : idx + block_size + 1 都是安全的。

因此最后不够组成一个block 的那些会被直接舍弃掉。因此不会存在“最后几个字符不够组成 block”的问题,因为 __len__ 已经提前限制了 idx 最大值,确保每次 idx:idx+65 都不会越界。

image-20251004225935040

如果最后几个 block 不够组成一个 batch 怎么办?

PyTorch 的 DataLoader 默认会处理这个问题:

  • 最后不足一个 batch_size 的部分,会作为一个“小 batch”送进去(比如 batch_size=32,但最后只有 12 个)
  • 除非你设置 drop_last=True,否则默认是保留它。

示例:

len(dataset) = 1000
batch_size = 32
→ 每 epoch 共 32 * 31 = 992 + 最后一个小 batch (8条)
如何判断 “到了最后一个 batch”?

在 PyTorch 的 DataLoader 中,每个 epoch 都会自动从头到尾迭代完一遍 dataset,你不需要手动判断是否是最后一个 batch,除非你有特殊逻辑。

如果你确实想知道,比如想在最后一个 batch 做额外处理,你可以这样做:

for i, (xb, yb) in enumerate(dataloader):
    if i == len(dataloader) - 1:
        print("This is the last batch!")

步骤六: 文本生成函数

def generate(model, start_text="To be", length=100):
    model.eval()
    device = next(model.parameters()).device  # 自动获取模型所在设备

    input_ids = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0).to(device)
    hidden = None
    result = list(start_text)

    for _ in range(length):
        out, hidden = model(input_ids[:, -1:], hidden)
        probs = torch.softmax(out[:, -1, :], dim=-1)
        next_id = torch.multinomial(probs, num_samples=1).item()
        result.append(itos[next_id])
        
        next_input = torch.tensor([[next_id]], dtype=torch.long).to(device)
        input_ids = torch.cat([input_ids, next_input], dim=1)

    return ''.join(result)


print(generate(model, start_text="The king", length=200))

(二)GRU 门控单元

数据和调用都不用变,只用变模型定义以及之后的内容

我们来修改原来的 CharRNN 模型为 CharGRU

步骤一:模型定义:CharGRU

import torch
import torch.nn as nn

class CharGRU(nn.Module):
    def __init__(self, vocab_size, embed_size=128, hidden_size=256):
        """
        CharGRU 是一个基于字符级的文本生成模型,使用 GRU 结构。
        参数说明:
        - vocab_size: 字符表的大小(例如 65)
        - embed_size: 嵌入向量的维度(每个字符被编码成一个向量)
        - hidden_size: GRU 的隐藏状态维度(决定记忆能力)
        """
        super().__init__()

        # 嵌入层:将输入的字符索引转换为稠密向量
        self.embedding = nn.Embedding(vocab_size, embed_size)

        # GRU 层:处理嵌入后的序列数据
        self.gru = nn.GRU(embed_size, hidden_size, batch_first=True)

        # 全连接层:将每个时间步的 GRU 输出映射为对下一个字符的预测(logits)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, h=None):
        """
        前向传播函数:
        - x: 输入张量 [batch_size, seq_len],每个元素是字符索引
        - h: 可选的初始隐藏状态(默认为 None)
        返回:
        - out: 每个时间步的 logits,形状 [batch_size, seq_len, vocab_size]
        - h: 最后的隐藏状态,可用于继续生成
        """
        x = self.embedding(x)             # [B, T] → [B, T, E]
        out, h = self.gru(x, h)           # out: [B, T, H]
        out = self.fc(out)                # [B, T, H] → [B, T, V]
        return out, h

步骤二:模型训练代码

# 自动选择设备(优先使用 GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# 创建模型并放到设备上
model = CharGRU(vocab_size).to(device)

# 优化器和损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  # 学习率建议小一点
loss_fn = nn.CrossEntropyLoss()  # 多分类任务的标准损失函数

# 设置为训练模式(不是必须,但推荐)
model.train()

# 训练循环(共 10 个 epoch)
for epoch in range(10):
    total_loss = 0
    count = 0

    for xb, yb in dataloader:
        xb, yb = xb.to(device), yb.to(device)        # 将输入和目标送入 GPU
        out, _ = model(xb)                            # 前向传播,输出形状 [B, T, V]

        # 将输出和目标 reshape 成 [B*T, V] 和 [B*T],以匹配 CrossEntropyLoss 的格式
        loss = loss_fn(out.view(-1, vocab_size), yb.view(-1))

        # 反向传播与优化
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # 防止梯度爆炸
        optimizer.step()

        total_loss += loss.item()
        count += 1

    avg_loss = total_loss / count
    print(f"Epoch {epoch} | Avg Loss: {avg_loss:.4f}")

步骤三:文本生成函数

def generate(model, start_text="To be", length=100):
    """
    使用训练好的模型生成文本
    参数:
    - model: 已训练好的 CharGRU 模型
    - start_text: 起始字符串
    - length: 要生成的字符总数
    返回:
    - 生成的字符串
    """
    model.eval()  # 设置为推理模式
    device = next(model.parameters()).device

    # 编码起始字符串,并加上 batch 维度:[1, len]
    input_ids = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0).to(device)
    hidden = None
    result = list(start_text)

    for _ in range(length):
        # 只拿最后一个时间步输入模型(避免太长)
        out, hidden = model(input_ids[:, -1:], hidden)   # [1, 1] → [1, 1, vocab_size]

        # 取最后时间步的输出,并 softmax 得到概率分布
        probs = torch.softmax(out[:, -1, :], dim=-1)     # [1, vocab_size]

        # 从分布中随机采样下一个字符索引
        next_id = torch.multinomial(probs, num_samples=1).item()

        # 将字符加入结果中
        result.append(itos[next_id])

        # 把这个字符作为下一个输入,拼到 input_ids 后面
        next_input = torch.tensor([[next_id]], dtype=torch.long).to(device)
        input_ids = torch.cat([input_ids, next_input], dim=1)

    return ''.join(result)

# 示例调用
print(generate(model, start_text="To be", length=300))


(三)LSTM 模型

你只需将 GRU 替换为 LSTM,其余数据加载、训练流程和文本生成代码 几乎不变,只需在模型定义中改用 nn.LSTM 并适配 LSTM 的返回值格式。

步骤一:模型定义CharLSTM

import torch
import torch.nn as nn

class CharLSTM(nn.Module):
    def __init__(self, vocab_size, embed_size=128, hidden_size=256):
        """
        CharLSTM 是字符级语言模型,基于 LSTM 构建。
        参数:
        - vocab_size:字符种类总数
        - embed_size:嵌入维度(每个字符映射为一个向量)
        - hidden_size:LSTM 的隐藏状态维度
        """
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, h=None):
        """
        前向传播:
        - x: [batch_size, seq_len] 字符索引输入
        - h: (h_0, c_0) 初始隐藏状态和记忆状态(可以为 None)
        返回:
        - out: logits 预测 [batch_size, seq_len, vocab_size]
        - h: (h_n, c_n) 最终的隐藏状态元组
        """
        x = self.embedding(x)           # [B, T] -> [B, T, E]
        out, h = self.lstm(x, h)        # out: [B, T, H]; h 是 tuple (h_n, c_n)
        out = self.fc(out)              # [B, T, H] -> [B, T, V]
        return out, h

步骤二:训练代码

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

model = CharLSTM(vocab_size).to(device)  # ✅ 替换成 LSTM 模型
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.CrossEntropyLoss()
model.train()

for epoch in range(10):
    total_loss = 0
    count = 0

    for xb, yb in dataloader:
        xb, yb = xb.to(device), yb.to(device)

        out, _ = model(xb)  # ✅ 不使用 hidden,不影响训练
        loss = loss_fn(out.view(-1, vocab_size), yb.view(-1))

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item()
        count += 1

    avg_loss = total_loss / count
    print(f"Epoch {epoch} | Avg Loss: {avg_loss:.4f}")

步骤三:文本生成函数

LSTM 返回的 hidden 是一个 tuple:(h, c),需要保持结构一致。

def generate(model, start_text="To be", length=100):
    """
    使用训练好的 CharLSTM 模型生成文本
    - start_text: 起始提示文本
    - length: 要生成的字符数
    """
    model.eval()
    device = next(model.parameters()).device

    input_ids = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0).to(device)
    hidden = None
    result = list(start_text)

    for _ in range(length):
        # 只输入最后一个字符
        out, hidden = model(input_ids[:, -1:], hidden)

        # 注意:hidden 是 tuple (h, c),会被下次继续使用
        probs = torch.softmax(out[:, -1, :], dim=-1)
        next_id = torch.multinomial(probs, num_samples=1).item()

        result.append(itos[next_id])
        next_input = torch.tensor([[next_id]], dtype=torch.long).to(device)
        input_ids = torch.cat([input_ids, next_input], dim=1)

    return ''.join(result)

# 示例调用
print(generate(model, start_text="O Romeo", length=300))

(四)Transformer 模型

Transformer 和之前有点不一样,下面给出从数据下载 → 预处理 → Dataset/DataLoader → Transformer 模型(GPT-like)→ 训练 → 文本生成的完整代码(按步骤分块)。放进同一个 Jupyter Notebook 即可运行。数据会保存在 data/ 目录下。

说明:本实现是字符级自回归 Transformer(类似简化版 GPT),包含嵌入、位置编码、N 层 TransformerEncoder、因果(下三角)mask,以及线性输出头。


步骤一:位置编码

Transformer 不自带顺序感,需要给序列位置注入信息。这里使用标准的正弦位置编码(可学习位置嵌入也行)。

class SinusoidalPositionalEncoding(nn.Module):
    """
    经典正弦位置编码。对每个位置 pos 生成一个 dim 向量,偶数/奇数维用 sin/cos。
    输出 shape 与输入一致:[B, T, E]
    """
    def __init__(self, d_model, max_len=4096):
        super().__init__()
        pe = torch.zeros(max_len, d_model)     # [max_len, E]
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # [max_len, 1]
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        # 偶数维
        pe[:, 0::2] = torch.sin(position * div_term)
        # 奇数维
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)                   # [1, max_len, E]
        self.register_buffer('pe', pe)         # 不作为参数训练,但随模型保存/加载

    def forward(self, x):
        # x: [B, T, E]
        T = x.size(1)
        return x + self.pe[:, :T, :]

步骤二:Causal Mask

确保第 t 个位置只能看见 <= t 的位置,防止“偷看未来”。

def generate_square_subsequent_mask(sz: int, device):
    """
    生成下三角 mask(True 表示屏蔽,False 表示可见)
    给 nn.Transformer/EncoderLayer 的 attn_mask 使用。
    shape: [T, T]
    """
    # 上三角(含对角线以上)设为 True(不可见)
    mask = torch.triu(torch.ones(sz, sz, device=device), diagonal=1).bool()
    return mask  # True=mask,False=keep

步骤三: 定义模型

简化 GPT-like Transformer

  • 字符嵌入 + 位置编码
  • N 层 TransformerEncoderLayer(自注意力 + FFN)
  • 线性输出到 vocab_size
  • 使用因果 mask保证自回归
class CharTransformer(nn.Module):
    def __init__(
        self,
        vocab_size,
        embed_size=256,
        num_heads=8,
        num_layers=6,
        ff_hidden=1024,
        dropout=0.1,
        max_len=4096,
        block_size=128,
    ):
        """
        参数:
        - vocab_size: 字符表大小
        - embed_size: 词嵌入/模型维度 d_model
        - num_heads: Multi-Head 注意力头数
        - num_layers: Transformer Encoder 层数
        - ff_hidden: 前馈层隐藏维度
        - dropout: dropout 概率
        - max_len: 支持的最大序列长度(用于位置编码)
        - block_size: 训练时上下文窗口长度
        """
        super().__init__()
        self.block_size = block_size

        self.token_emb = nn.Embedding(vocab_size, embed_size)
        self.pos_enc = SinusoidalPositionalEncoding(embed_size, max_len=max_len)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_size,
            nhead=num_heads,
            dim_feedforward=ff_hidden,
            dropout=dropout,
            batch_first=True,      # 让输入输出都是 [B, T, E]
            activation="gelu"
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.ln_f = nn.LayerNorm(embed_size)
        self.head = nn.Linear(embed_size, vocab_size)

    def forward(self, x):
        """
        x: [B, T](每个位置是字符索引)
        返回:
        logits: [B, T, V]
        """
        B, T = x.shape
        device = x.device
        assert T <= self.block_size, f"输入序列长度 {T} 超过 block_size {self.block_size}"

        tok = self.token_emb(x)           # [B, T, E]
        h = self.pos_enc(tok)             # [B, T, E]

        # 因果 mask:形状 [T, T],True=mask
        attn_mask = generate_square_subsequent_mask(T, device)
        # TransformerEncoder 的 key_padding_mask 是 [B, T](True=padding);这里不需要就 None
        out = self.encoder(h, mask=attn_mask)  # [B, T, E]

        out = self.ln_f(out)
        logits = self.head(out)            # [B, T, V]
        return logits

步骤四:训练循环

  • 与 RNN/GRU/LSTM 基本一致:CrossEntropyLoss 输入 [B*T, V],目标 [B*T]
  • 添加梯度裁剪与平均 loss 打印。
  • 自动选择设备。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

model = CharTransformer(
    vocab_size=vocab_size,
    embed_size=256,
    num_heads=8,
    num_layers=6,
    ff_hidden=1024,
    dropout=0.1,
    max_len=4096,
    block_size=block_size,
).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=0.01)
loss_fn = nn.CrossEntropyLoss()

model.train()
epochs = 10

for epoch in range(epochs):
    total_loss = 0.0
    steps = 0
    for xb, yb in dataloader:
        xb = xb.to(device)  # [B, T]
        yb = yb.to(device)  # [B, T]

        logits = model(xb)  # [B, T, V]
        loss = loss_fn(logits.view(-1, vocab_size), yb.view(-1))

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item()
        steps += 1

    avg = total_loss / steps
    ppl = math.exp(avg)
    print(f"Epoch {epoch} | Avg Loss: {avg:.4f} | PPL: {ppl:.2f}")

说明:Transformer 往往比 RNN 更吃算力,默认 6 层/8 头已经不小。若显存或速度吃紧,可先试:num_layers=2~4, embed=192, heads=6, ff=768


步骤五:采样/生成函数

  • 使用自回归生成:每次喂入最后一个 token(或最后 block_size 个),预测下一个。
  • temperature 控制多样性( >1 更发散,<1 更保守)。
  • top_k 只保留概率最大的 k 个 token 再采样,能抑制杂乱。
@torch.no_grad()
def generate(
    model,
    start_text="To be",
    length=200,
    temperature=1.0,
    top_k=None
):
    model.eval()
    device = next(model.parameters()).device

    # 将起始文本编码为张量并放到设备上,形状 [1, L]
    input_ids = torch.tensor(encode(start_text), dtype=torch.long).unsqueeze(0).to(device)

    result = list(start_text)

    for _ in range(length):
        # 只取最后 block_size 个 token 作为模型输入(滚动窗口)
        if input_ids.size(1) > model.block_size:
            x_cond = input_ids[:, -model.block_size:]
        else:
            x_cond = input_ids

        logits = model(x_cond)                 # [1, T, V]
        last_logits = logits[:, -1, :] / max(temperature, 1e-6)  # 防止除零

        if top_k is not None:
            # 只保留 top_k 个最大值,其余设为 -inf(softmax≈0)
            values, indices = torch.topk(last_logits, k=top_k, dim=-1)
            mask = torch.full_like(last_logits, float('-inf'))
            last_logits = mask.scatter(1, indices, values)

        probs = torch.softmax(last_logits, dim=-1)   # [1, V]
        next_id = torch.multinomial(probs, num_samples=1)  # [1, 1](随机采样)
        input_ids = torch.cat([input_ids, next_id], dim=1)
        result.append(itos[next_id.item()])

    return ''.join(result)

# 示例:
print(generate(model, start_text="To be, or not to be", length=300, temperature=0.9, top_k=40))
阶段模型数据集建议项目建议
Step 1RNNchar-rnn(字符级)字符生成器
Step 2LSTM / GRUnames.txt, IMDB姓名分类、情感分析
Step 3TransformerWikitext2, Penn Treebank文本生成
Step 4Seq2Seq / AttentionMulti30k, IWSLT英德翻译
Step 5时间序列预测股票 / 天气 CSVLSTM回归预测